热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

连续性|量词_FlinkCEPFlink的复杂事件处理

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkCEP-Flink的复杂事件处理相关的知识,希望对你有一定的参考价值。1FlinkCEP

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink CEP - Flink的复杂事件处理相关的知识,希望对你有一定的参考价值。



1 Flink CEP 是什么

FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分


2 Flink CEP 特点

目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件


3 Flink CEP 应用场景

风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。


4 Flink CEP原理

Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。

take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。
模式API (Pattern API)

模式API可以让你定义想从输入流中抽取的复杂模式序列

个体模式:组成复杂规则的每一个单独的模式定义,就是“个体模式”

A 量词

在FlinkCEP中,你可以通过这些方法指定循环模式:


  • pattern.oneOrMore() 指定期望一个给定事件出现一次或者多次的模式
  • pattern.times(#fTimes) 指定期望一个给定事件出现特定次数的模式,例如出现4次A
  • pattern.times(#fromTimes, #toTimes) 指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次A
  • pattern.greedy() 方法让循环模式变成贪心的
  • pattern.optional() 方法让所有的模式变成可选的,不管是否是循环模式
    // 期望出现4次start.times(4)// 期望出现0或者4次
  • start.times(4).optional()// 期望出现2、3或者4次 -
  • start.times(2, 4)// 期望出现2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).greedy()// 期望出现0、2、3或者4次
  • start.times(2, 4).optional()// 期望出现0、2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).optional().greedy()// 期望出现1到多次
  • start.oneOrMore()// 期望出现1到多次,并且尽可能的重复次数多
  • start.oneOrMore().greedy()// 期望出现0到多次
  • start.oneOrMore().optional()// 期望出现0到多次,并且尽可能的重复次数
  • start.oneOrMore().optional().greedy()// 期望出现2到多次
  • start.timesOrMore(2)// 期望出现2到多次,并且尽可能的重复次数
  • start.timesOrMore(2).greedy()// 期望出现0、2或多次
  • start.timesOrMore(2).optional()// 期望出现0、2或多次,并且尽可能的重复次数多
  • start.timesOrMore(2).optional().greedy()B 条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过


  • pattern.where() 增加新的条件,多个条件应该同时满足 pattern.where(event => … /*
    一些判断条件 */)

  • pattern.or() 增加新的条件,多个条件满足任意一个都可以

  • pattern.where(event =>
    … /* 一些判断条件 /) .or(event => … / 替代条件 */)

  • pattern.until()
    为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。如"(a+ until b)"
    (一个或者更多的"a"直到"b")

  • pattern.oneOrMore().until(event => … /* 替代条件
    */)以上这些可以是可迭代的

  • subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式

  • pattern.subtype(classOf[SubEvent]) oneOrMore()
    指定模式期望匹配到的事件至少出现一次。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。提示:推荐使用until()或者within()来清理状态。

  • pattern.oneOrMore() timesOrMore(#times)
    指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。

  • pattern.timesOrMore(2) optional()
    指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。

  • pattern.oneOrMore().optional() greedy()
    指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。

  • pattern.oneOrMore().greedy()迭代条件 :
    这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

  • pattern.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx)
    => lazy val sum = ctx.getEventsForPattern(“middle”).map(_.getPrice).sum
    value.getName.startsWith(“foo”) && sum &#43; value.getPrice <5.0 )组合模式:很多个体模式组合起来&#xff0c;就形成了整个的模式序列

FlinkCEP支持事件之间如下形式的连续策略

// 严格连续
val strict: Pattern[Event, _] &#61; start.next("event").where(...)
// 松散连续
val relaxed: Pattern[Event, _] &#61; start.followedBy("event").where(...)
// 不确定的松散连续
val nonDetermin: Pattern[Event, _] &#61; start.followedByAny("event").where(...)
// 严格连续的NOT模式
val strictNot: Pattern[Event, _] &#61; start.notNext("not").where(...)
// 松散连续的NOT模式
val relaxedNot: Pattern[Event, _] &#61; start.notFollowedBy("not").where(...)

松散连续意味着跟着的事件中&#xff0c;只有第一个可匹配的事件会被匹配上&#xff0c;而不确定的松散连接情况下&#xff0c;有着同样起始的多个匹配会被输出。举例来说&#xff0c;模式"a b"&#xff0c;给定事件序列"a"&#xff0c;“c”&#xff0c;“b1”&#xff0c;“b2”&#xff0c;会产生如下的结果&#xff1a;

"a"和"b"之间严格连续&#xff1a; &#xff08;没有匹配&#xff09;&#xff0c;"a"之后的"c"导致"a"被丢弃。

"a"和"b"之间松散连续&#xff1a;a b1&#xff0c;松散连续会”跳过不匹配的事件直到匹配上的事件”。

"a"和"b"之间不确定的松散连续&#xff1a;a b1, a b2&#xff0c;这是最常见的情况。

也可以为模式定义一个有效时间约束。例如&#xff0c;你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.

next.within(Time.seconds(10))循环模式中的连续性

对于循环模式&#xff08;例如oneOrMore()和times())&#xff09;&#xff0c;默认是松散连续。如果想使用严格连续&#xff0c;你需要使用consecutive()方法明确指定&#xff0c; 如果想使用不确定松散连续&#xff0c;你可以使用allowCombinations()方法。

consecutive

与oneOrMore()和times()一起使用&#xff0c; 在匹配的事件之间施加严格的连续性&#xff0c; 也就是说&#xff0c;任何不匹配的事件都会终止匹配&#xff08;和next()一样&#xff09;。如果不使用它&#xff0c;那么就是松散连续&#xff08;和followedBy()一样&#xff09;

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
.oneOrMore()
.consecutive()
.followedBy("end1")
.where(_.getName().equals("b"))

输入&#xff1a;C D A1 A2 A3 D A4 B&#xff0c;会产生下面的输出&#xff1a;如果施加严格连续性&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B不施加严格连续性&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A3 A4 BallowCombinations

与oneOrMore()和times()一起使用&#xff0c; 在匹配的事件中间施加不确定松散连续性&#xff08;和followedByAny()一样&#xff09;。如果不使用&#xff0c;就是松散连续&#xff08;和followedBy()一样&#xff09;

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
.oneOrMore()
.allowCombinations()
.followedBy("end1").where(_.getName().equals("b"))

输入&#xff1a;C D A1 A2 A3 D A4 B&#xff0c;会产生如下的输出&#xff1a;如果使用不确定松散连续&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A3 B&#xff0c;C A1 A4 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A4 B&#xff0c;C A1 A3 A4 B&#xff0c;C A1 A2 A3 A4 B如果不使用&#xff1a;C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A3 A4 B5 匹配后跳过策略

对于一个给定的模式&#xff0c;同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上&#xff0c;你需要指定跳过策略AfterMatchSkipStrategy。有五种跳过策略&#xff0c;如下&#xff1a;

NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

val skipStrategy &#61; AfterMatchSkipStrategy.skipToFirst("patternName").throwExceptionOnMiss()Pattern.begin("patternName", skipStrategy)

例如&#xff0c;给定一个模式b&#43; c和一个数据流b1 b2 b3 c&#xff0c;不同跳过策略之间的不同如下&#xff1a;


5 模式的检测

在指定了要寻找的模式后&#xff0c;该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式&#xff0c;需要创建一个PatternStream。给定一个输入流input&#xff0c;一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator

object Cep
def main(args: Array[String]): Unit &#61;
val env: StreamExecutionEnvironment &#61; StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
val loginDS: DataStream[LoginInfo] &#61; env.fromElements("") .map(data &#61;>
val datas: Array[String] &#61; data.split(",") LoginInfo(datas(0), datas(1), datas(2).toInt,datas(3).toLong) )
// 创建规则
val pattern: Pattern[LoginInfo, LoginInfo] &#61; Pattern.begin[LoginInfo]("start")
// 增加新的条件&#xff0c;多个条件应该同时满足
.where(_.age > 1) .where(_.age < 12)
// 应用规则&#xff1a;将规则应用到数据流中
val loginPS: PatternStream[LoginInfo] &#61; CEP.pattern(loginDS,pattern)
// 获取规则的结果
val ds: DataStream[String] &#61; loginPS.select(
map &#61;>
map.toString ) ds.print() env.execute()


case class LoginInfo( id:String, name:String, age:Int, loginTime:Long
)

6 从模式中选取

创建 PatternStream 之后&#xff0c;就可以应用 select 或者 flatselect 方法&#xff0c;从检测到的事件序列中提取事件了

select() 方法需要输入一个 select function 作为参数&#xff0c;每个成功匹配的事件序列都会调用它

select() 以一个 Map[String&#xff0c;Iterable [IN]] 来接收匹配到的事件序列&#xff0c;其中 key 就是每个模式的名称&#xff0c;而 value 就是所有接收到的事件的 Iterable 类型

val loginPS: PatternStream[LoginInfo] &#61; CEP.pattern(loginDS,pattern)
val outputTag: OutputTag[String] &#61; OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[CompleLogin] &#61;loginPS.select(outputTag) (pattern: Map[String, Iterable[LoginInfo]], timestamp: Long, out: Collector[TimeoutLogin]) &#61;> out.collect(TimeoutLogin()) (pattern: mutable.Map[String, Iterable[LoginInfo]], out: Collector[CompleLogin]) &#61;> out.collect(CompleLogin())
val timeoutResult: DataStream[TimeoutLogin] &#61;result.getSideOutput(outputTag)

推荐阅读
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 本文详细介绍了使用C#实现Word模版打印的方案。包括添加COM引用、新建Word操作类、开启Word进程、加载模版文件等步骤。通过该方案可以实现C#对Word文档的打印功能。 ... [详细]
  • node.jsurlsearchparamsAPI哎哎哎 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
author-avatar
mobiledu2502911637
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有